package day04;

import beans.SensorReading;
import day03.window.FlinkWindow00;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * ProcessFunction API - KeyedProcessFunction
 *
 * @author lvbingbing
 * @date 2022-01-09 16:28
 */
public class ProcessFunction01 {
    public static void main(String[] args) throws Exception {
        // 1、创建 FlinkWindow00 对象，有参构造会初始化 env，并从socket文本流中读取数据
        int parallelism = 1;
        FlinkWindow00 flinkWindow = new FlinkWindow00(parallelism);
        // 2、获取可执行环境
        StreamExecutionEnvironment env = flinkWindow.getEnv();
        // 3、测试 keyedProcessFunction，先分组然后自定义处理
        studyKeyedProcessFunction(flinkWindow.getSingleOutputStreamOperator());
        // 4、触发程序执行
        env.execute();
    }

    /**
     * 测试 keyedProcessFunction，先分组然后自定义处理
     *
     * @param sensorReadingStream <br>
     */
    private static void studyKeyedProcessFunction(SingleOutputStreamOperator<SensorReading> sensorReadingStream) {
        sensorReadingStream.keyBy("id")
                .process(new MyKeyedProcessFunction())
                .print("keyedProcessFunction");
    }

    private static class MyKeyedProcessFunction extends KeyedProcessFunction<Tuple, SensorReading, Integer> {

        private static final long serialVersionUID = 2817018866357996794L;

        private transient ValueState<Long> tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("ts-timer", Long.class));
        }

        @Override
        public void processElement(SensorReading sensorReading, KeyedProcessFunction<Tuple, SensorReading, Integer>.Context ctx, Collector<Integer> out) throws Exception {

            out.collect(sensorReading.getId().length());
            // 当前正在处理的元素的时间戳。时间特性为：处理时间时，timestamp 为 null
            Long timestamp = ctx.timestamp();
            System.out.println("timestamp：" + timestamp);
            // 获取正在处理的元素的键
            Tuple currentKey = ctx.getCurrentKey();
            System.out.println("currentKey：" + currentKey);
            // 将记录发送到由OutputTag标识的侧输出。
            OutputTag<SensorReading> outputTag = new OutputTag<>(sensorReading.getId());
            ctx.output(outputTag, sensorReading);
            //  timerService
            TimerService timerService = ctx.timerService();
            // 获取当前的处理时间
            long currentProcessingTime = timerService.currentProcessingTime();
            System.out.println("currentProcessingTime：" + currentProcessingTime);
            // 当前事件时间的水位线
            long currentWatermark = timerService.currentWatermark();
            System.out.println("currentWatermark：" + currentWatermark);
            // 注册处理时间定时器
            timerService.registerProcessingTimeTimer(currentProcessingTime + 5000L);
            // 注册事件时间定时器
            timerService.registerEventTimeTimer((sensorReading.getTimestamp() + 10) * 1000L);
            // 删除处理时间定时器
            timerService.deleteProcessingTimeTimer(tsTimerState.value());

            // 更新当前状态
            tsTimerState.update(currentProcessingTime + 1000L);
        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Tuple, SensorReading, Integer>.OnTimerContext ctx, Collector<Integer> out) throws Exception {
            System.out.println(timestamp + "定时器触发");
            Tuple currentKey = ctx.getCurrentKey();
            String field = currentKey.getField(0);
            System.out.println("当前key：" + field);
            // 当前的时间域。(处理时间/事件时间)
            TimeDomain timeDomain = ctx.timeDomain();
            System.out.println("当前的时间特性：" + timeDomain);
        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }
}
